Solutions/WithSecureElementsViaFunction/Data Connectors/WithSecureElementsAzureFunction/lib/withsecure_client.py (146 lines of code) (raw):

import logging from base64 import b64encode from pathlib import PurePosixPath, PureWindowsPath log = logging.getLogger(__name__) class WithSecureClient: def __init__( self, elements_api_url, client_id, client_secret, engine, engine_group, rest_client, max_events_count, ): self._client_id = client_id self._client_secret = client_secret self._api_url = elements_api_url self._events_path = "/security-events/v1/security-events" self._engine = engine self._engine_group = engine_group self.rest = rest_client self.max_events_count = max_events_count def get_events_after(self, from_date): token = self._authenticate() return self._get_events_after(token, from_date) def _authenticate(self): auth_header = b64encode( bytes(self._client_id + ":" + self._client_secret, "utf-8") ).decode("utf-8") headers = { "Content-type": "application/x-www-form-urlencoded;charset=UTF-8", "Accept": "application/json", "Authorization": "Basic " + auth_header, "User-Agent": "sentinel-connector", } data = {"grant_type": "client_credentials", "scope": "connect.api.read"} header_keys_to_log = ["Content-type", "Accept", "User-Agent"] headers_to_log = {key: headers[key] for key in header_keys_to_log} log.info( f"Executing authorization request to Elements API... " f"headers={headers_to_log}, " f"body={data}" ) response = self.rest.post( self._api_url + "/as/token.oauth2", data=data, headers=headers ) log.info("Response headers=" + str(response.headers)) if response.ok: res_body = response.json() return res_body["access_token"] else: log.info("Response=" + response.text) log.info("Transaction-id=" + response.headers.get("X-Transaction")) raise Exception("Authentication failed") def _get_events_after(self, auth_token, from_date, org_id=None): next_page = None fetch_page = True log.info(f"Reading events created after {from_date}") all_events = [] while fetch_page and len(all_events) < self.max_events_count: page = self._get_events_page(auth_token, from_date, org_id, next_page) next_page = page.get("nextAnchor") fetch_page = next_page is not None for event in page["items"]: all_events.append(SecurityEvent(**event)) if len(all_events) >= self.max_events_count: fetch_page = False break return all_events def _get_events_page(self, auth_token, from_date, org_id=None, next_page=None): headers = { "Accept": "application/json", "Authorization": "Bearer " + auth_token, "User-Agent": "my-script", } log.info("engine: %s " % self._engine) log.info("engine_group: %s " % self._engine_group) if self._engine and self._engine != "default": engine_param = "engine" engine_param_value = self._engine elif self._engine_group and self._engine_group != "default": engine_param = "engineGroup" engine_param_value = self._engine_group else: engine_param = "engineGroup" engine_param_value = "epp,edr,ecp" data = { "limit": 100, "persistenceTimestampStart": from_date, "order": "asc", engine_param: engine_param_value, "exclusiveStart": "true", } if next_page: data["anchor"] = next_page if org_id: data["organizationId"] = org_id headers_to_log = dict(headers) headers_to_log["Authorization"] = "REDACTED" log.info( f"Reading security events after {from_date}. Request={data}, request headers={headers_to_log}" ) response = self.rest.post( self._api_url + self._events_path, data=data, headers=headers ) tx_id = response.headers.get("X-Transaction", "unknown") log.info(f"Elements API response. headers={response.headers}, tx-id={tx_id}") if not response.ok: log.info("Error %s", response.text) raise Exception("Request error") return response.json() class SecurityEvent: def __init__(self, **kwargs): self.details = kwargs.get("details", {}) self.device = kwargs.get("device", {}) for k, v in kwargs.items(): if not hasattr(self, k): setattr(self, k, v) def infection_name(self): name = self.details.get("infectionName", "") return self.details.get("name", name) def file_path(self): if "filePath" in self.details: return self.details["filePath"] elif "object" in self.details: return self.details["object"] else: return self.details.get("path", "") def host_name(self): if not self.device: return None if "name" in self.device: return self.device.get("name") elif "id" in self.device: return self.device.get("id") elif "winsAddress" in self.device: return self.device.get("winsAddress") return None def _file_name(self, key): posix = PureWindowsPath(self.details.get(key, "")).as_posix() return PurePosixPath(posix).name def process_name(self): return self._file_name("process") def file_name(self): return self._file_name("filePath")